Laravel Queue——消息队列任务处理器源码剖析

运行队列处理器

队列处理器的设置

Laravel 包含一个队列处理器,当新任务被推到队列中时它能处理这些任务。你可以通过 queue:work 命令来运行处理器。要注意,一旦 queue:work 命令开始,它将一直运行,直到你手动停止或者你关闭控制台:

  1. php artisan queue:work
  • 可以指定队列处理器所使用的连接。
  1. php artisan queue:work redis
  • 可以自定义队列处理器,方式是处理给定连接的特定队列。
  1. php artisan queue:work redis --queue=emails
  • 可以使用 --once 选项来指定仅对队列中的单一任务进行处理:
  1. php artisan queue:work --once
  • 如果一个任务失败了,会被放入延时队列中取,--delay 选项可以设置失败任务的延时时间:
  1. php artisan queue:work --delay=2
  • 如果想要限制一个任务的内存,可以使用 --memory:
  1. php artisan queue:work --memory=128
  • 当队列需要处理任务时,进程将继续处理任务,它们之间没有延迟。但是,如果没有新的工作可用,--sleep 参数决定了工作进程将 「睡眠」 多长时间:
  1. php artisan queue:work --sleep=3
  • 可以指定 Laravel 队列处理器最多执行多长时间后就应该被关闭掉:
  1. php artisan queue:work --timeout=60
  • 可以指定 Laravel 队列处理器失败任务重试的次数:
  1. php artisan queue:work --tries=60

可以看出来,队列处理器的设置大多数都可以由任务类进行设置,但是其中三个 sleepdelaymemory 只能由 artisan 来设置。

WorkCommand 命令行启动

任务处理器进程的命令行模式会调用 Illuminate\Queue\Console\WorkCommand,这个类在初始化的时候依赖注入了 Illuminate\Queue\Worker:

  1. class WorkCommand extends Command
  2. {
  3. protected $signature = 'queue:work
  4. {connection? : The name of connection}
  5. {--queue= : The queue to listen on}
  6. {--daemon : Run the worker in daemon mode (Deprecated)}
  7. {--once : Only process the next job on the queue}
  8. {--delay=0 : Amount of time to delay failed jobs}
  9. {--force : Force the worker to run even in maintenance mode}
  10. {--memory=128 : The memory limit in megabytes}
  11. {--sleep=3 : Number of seconds to sleep when no job is available}
  12. {--timeout=60 : The number of seconds a child process can run}
  13. {--tries=0 : Number of times to attempt a job before logging it failed}';
  14. public function __construct(Worker $worker)
  15. {
  16. parent::__construct();
  17. $this->worker = $worker;
  18. }
  19. public function fire()
  20. {
  21. if ($this->downForMaintenance() && $this->option('once')) {
  22. return $this->worker->sleep($this->option('sleep'));
  23. }
  24. $this->listenForEvents();
  25. $connection = $this->argument('connection')
  26. ?: $this->laravel['config']['queue.default'];
  27. $queue = $this->getQueue($connection);
  28. $this->runWorker(
  29. $connection, $queue
  30. );
  31. }
  32. }

任务处理器启动后,会运行 fire 函数,在执行任务之前,程序首先会注册监听事件,主要监听任务完成与任务失败的情况:

  1. protected function listenForEvents()
  2. {
  3. $this->laravel['events']->listen(JobProcessed::class, function ($event) {
  4. $this->writeOutput($event->job, false);
  5. });
  6. $this->laravel['events']->listen(JobFailed::class, function ($event) {
  7. $this->writeOutput($event->job, true);
  8. $this->logFailedJob($event);
  9. });
  10. }
  11. protected function writeOutput(Job $job, $failed)
  12. {
  13. if ($failed) {
  14. $this->output->writeln('<error>['.Carbon::now()->format('Y-m-d H:i:s').'] Failed:</error> '.$job->resolveName());
  15. } else {
  16. $this->output->writeln('<info>['.Carbon::now()->format('Y-m-d H:i:s').'] Processed:</info> '.$job->resolveName());
  17. }
  18. }
  19. protected function logFailedJob(JobFailed $event)
  20. {
  21. $this->laravel['queue.failer']->log(
  22. $event->connectionName, $event->job->getQueue(),
  23. $event->job->getRawBody(), $event->exception
  24. );
  25. }

启动任务管理器 runWorker,该函数默认会调用 Illuminate\Queue\Workerdaemon 函数,只有在命令中强制 --once 参数的时候,才会执行 runNestJob 函数:

  1. protected function runWorker($connection, $queue)
  2. {
  3. $this->worker->setCache($this->laravel['cache']->driver());
  4. return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
  5. $connection, $queue, $this->gatherWorkerOptions()
  6. );
  7. }

Worker 任务调度

Laravel Queue——消息队列任务处理器源码剖析 - 图1

我们接下来接着看 daemon 函数:

  1. public function daemon($connectionName, $queue, WorkerOptions $options)
  2. {
  3. $this->listenForSignals();
  4. $lastRestart = $this->getTimestampOfLastQueueRestart();
  5. while (true) {
  6. if (! $this->daemonShouldRun($options)) {
  7. $this->pauseWorker($options, $lastRestart);
  8. continue;
  9. }
  10. $job = $this->getNextJob(
  11. $this->manager->connection($connectionName), $queue
  12. );
  13. $this->registerTimeoutHandler($job, $options);
  14. if ($job) {
  15. $this->runJob($job, $connectionName, $options);
  16. } else {
  17. $this->sleep($options->sleep);
  18. }
  19. $this->stopIfNecessary($options, $lastRestart);
  20. }
  21. }

信号处理

listenForSignals 函数用于 PHP 7.1 版本以上,用于脚本的信号处理。所谓的信号处理,就是由 Process Monitor(如 Supervisor )发送并与我们的脚本进行通信的异步通知。

  1. protected function listenForSignals()
  2. {
  3. if ($this->supportsAsyncSignals()) {
  4. pcntl_async_signals(true);
  5. pcntl_signal(SIGTERM, function () {
  6. $this->shouldQuit = true;
  7. });
  8. pcntl_signal(SIGUSR2, function () {
  9. $this->paused = true;
  10. });
  11. pcntl_signal(SIGCONT, function () {
  12. $this->paused = false;
  13. });
  14. }
  15. }
  16. protected function supportsAsyncSignals()
  17. {
  18. return version_compare(PHP_VERSION, '7.1.0') >= 0 &&
  19. extension_loaded('pcntl');
  20. }

pcntl_async_signals() 被调用来启用信号处理,然后我们为多个信号注册处理程序:

  • 当脚本被 Supervisor 指示关闭时,会引发信号 SIGTERM
  • SIGUSR2 是用户定义的信号,Laravel用来表示脚本应该暂停。
  • 当暂停的脚本被 Supervisor 指示继续进行时,会引发 SIGCONT

在真正运行任务之前,程序还从 cache 中取了一次最后一次重启的时间:

  1. protected function getTimestampOfLastQueueRestart()
  2. {
  3. if ($this->cache) {
  4. return $this->cache->get('illuminate:queue:restart');
  5. }
  6. }

确定 worker 是否应该处理作业

进入循环后,首先要判断当前脚本是应该处理任务,还是应该暂停,还是应该退出:

  1. protected function daemonShouldRun(WorkerOptions $options)
  2. {
  3. return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
  4. $this->paused ||
  5. $this->events->until(new Events\Looping) === false);
  6. }

以下几种情况,循环将不会处理任务:

  • 脚本处于 维护模式 并且没有 --force 选项
  • 脚本被 supervisor 暂停
  • 脚本的 looping 事件监听器返回 false

looping 事件监听器在每次循环的时候都会被启动,如果返回 false,那么当前的循环将会被暂停:pauseWorker:

  1. protected function pauseWorker(WorkerOptions $options, $lastRestart)
  2. {
  3. $this->sleep($options->sleep > 0 ? $options->sleep : 1);
  4. $this->stopIfNecessary($options, $lastRestart);
  5. }

脚本在 sleep 一段时间之后,就要重新判断当前脚本是否需要 stop

  1. protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
  2. {
  3. if ($this->shouldQuit) {
  4. $this->kill();
  5. }
  6. if ($this->memoryExceeded($options->memory)) {
  7. $this->stop(12);
  8. } elseif ($this->queueShouldRestart($lastRestart)) {
  9. $this->stop();
  10. }
  11. }
  12. protected function queueShouldRestart($lastRestart)
  13. {
  14. return $this->getTimestampOfLastQueueRestart() != $lastRestart;
  15. }
  16. protected function getTimestampOfLastQueueRestart()
  17. {
  18. if ($this->cache) {
  19. return $this->cache->get('illuminate:queue:restart');
  20. }
  21. }

以下情况脚本将会被 stop

  • 脚本被 supervisor 退出
  • 内存超限
  • 脚本被重启过
  1. public function kill($status = 0)
  2. {
  3. if (extension_loaded('posix')) {
  4. posix_kill(getmypid(), SIGKILL);
  5. }
  6. exit($status);
  7. }
  8. public function stop($status = 0)
  9. {
  10. $this->events->fire(new Events\WorkerStopping);
  11. exit($status);
  12. }

脚本被重启,当前的进程需要退出并且重新加载。

获取下一个任务

当含有多个队列的时候,命令行可以用 , 连接多个队列的名字,位于前面的队列优先级更高:

  1. protected function getNextJob($connection, $queue)
  2. {
  3. try {
  4. foreach (explode(',', $queue) as $queue) {
  5. if (! is_null($job = $connection->pop($queue))) {
  6. return $job;
  7. }
  8. }
  9. } catch (Exception $e) {
  10. $this->exceptions->report($e);
  11. } catch (Throwable $e) {
  12. $this->exceptions->report(new FatalThrowableError($e));
  13. }
  14. }

$connection 是具体的驱动,我们这里是 Illuminate\Queue\RedisQueue:

  1. class RedisQueue extends Queue implements QueueContract
  2. {
  3. public function pop($queue = null)
  4. {
  5. $this->migrate($prefixed = $this->getQueue($queue));
  6. list($job, $reserved) = $this->retrieveNextJob($prefixed);
  7. if ($reserved) {
  8. return new RedisJob(
  9. $this->container, $this, $job,
  10. $reserved, $this->connectionName, $queue ?: $this->default
  11. );
  12. }
  13. }
  14. protected function getQueue($queue)
  15. {
  16. return 'queues:'.($queue ?: $this->default);
  17. }

在从队列中取出任务之前,需要先将 delay 队列和 reserved 队列中已经到时间的任务放到主队列中:

  1. protected function migrate($queue)
  2. {
  3. $this->migrateExpiredJobs($queue.':delayed', $queue);
  4. if (! is_null($this->retryAfter)) {
  5. $this->migrateExpiredJobs($queue.':reserved', $queue);
  6. }
  7. }
  8. public function migrateExpiredJobs($from, $to)
  9. {
  10. return $this->getConnection()->eval(
  11. LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
  12. );
  13. }

由于从队列取出任务、在队列删除任务、压入主队列是三个操作,为了防止并发,程序这里使用了 LUA 脚本,保证三个操作的原子性:

  1. public static function migrateExpiredJobs()
  2. {
  3. return <<<'LUA'
  4. -- Get all of the jobs with an expired "score"...
  5. local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
  6. -- If we have values in the array, we will remove them from the first queue
  7. -- and add them onto the destination queue in chunks of 100, which moves
  8. -- all of the appropriate jobs onto the destination queue very safely.
  9. if(next(val) ~= nil) then
  10. redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
  11. for i = 1, #val, 100 do
  12. redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
  13. end
  14. end
  15. return val
  16. LUA;
  17. }

接下来,就要从主队列中获取下一个任务,在取出下一个任务之后,还要将任务放入 reserved 队列中,当任务执行失败后,该任务会进行重试。

  1. protected function retrieveNextJob($queue)
  2. {
  3. return $this->getConnection()->eval(
  4. LuaScripts::pop(), 2, $queue, $queue.':reserved',
  5. $this->availableAt($this->retryAfter)
  6. );
  7. }
  8. public static function pop()
  9. {
  10. return <<<'LUA'
  11. -- Pop the first job off of the queue...
  12. local job = redis.call('lpop', KEYS[1])
  13. local reserved = false
  14. if(job ~= false) then
  15. -- Increment the attempt count and place job on the reserved queue...
  16. reserved = cjson.decode(job)
  17. reserved['attempts'] = reserved['attempts'] + 1
  18. reserved = cjson.encode(reserved)
  19. redis.call('zadd', KEYS[2], ARGV[1], reserved)
  20. end
  21. return {job, reserved}
  22. LUA;
  23. }

redis 中获取到 job 之后,就会将其包装成 RedisJob 类:

  1. public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $connectionName, $queue)
  2. {
  3. $this->job = $job;
  4. $this->redis = $redis;
  5. $this->queue = $queue;
  6. $this->reserved = $reserved;
  7. $this->container = $container;
  8. $this->connectionName = $connectionName;
  9. $this->decoded = $this->payload();
  10. }
  11. public function payload()
  12. {
  13. return json_decode($this->getRawBody(), true);
  14. }
  15. public function getRawBody()
  16. {
  17. return $this->job;
  18. }

超时处理

如果一个脚本超时, pcntl_alarm 将会启动并杀死当前的 work 进程。杀死进程后, work 进程将会被守护进程重启,继续进行下一个任务。

  1. protected function registerTimeoutHandler($job, WorkerOptions $options)
  2. {
  3. if ($options->timeout > 0 && $this->supportsAsyncSignals()) {
  4. pcntl_signal(SIGALRM, function () {
  5. $this->kill(1);
  6. });
  7. pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep);
  8. }
  9. }
  10. protected function timeoutForJob($job, WorkerOptions $options)
  11. {
  12. return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
  13. }

任务事务

运行任务前后会启动两个事件 JobProcessingJobProcessed,这两个事件需要事先注册监听者

  1. protected function runJob($job, $connectionName, WorkerOptions $options)
  2. {
  3. try {
  4. return $this->process($connectionName, $job, $options);
  5. } catch (Exception $e) {
  6. $this->exceptions->report($e);
  7. } catch (Throwable $e) {
  8. $this->exceptions->report(new FatalThrowableError($e));
  9. }
  10. }
  11. public function process($connectionName, $job, WorkerOptions $options)
  12. {
  13. try {
  14. $this->raiseBeforeJobEvent($connectionName, $job);
  15. $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
  16. $connectionName, $job, (int) $options->maxTries
  17. );
  18. $job->fire();
  19. $this->raiseAfterJobEvent($connectionName, $job);
  20. } catch (Exception $e) {
  21. $this->handleJobException($connectionName, $job, $options, $e);
  22. } catch (Throwable $e) {
  23. $this->handleJobException(
  24. $connectionName, $job, $options, new FatalThrowableError($e)
  25. );
  26. }
  27. }

任务前与任务后事件

raiseBeforeJobEvent 函数用于触发任务处理前的事件,raiseAfterJobEvent 函数用于触发任务处理后的事件:

  1. protected function raiseBeforeJobEvent($connectionName, $job)
  2. {
  3. $this->events->fire(new Events\JobProcessing(
  4. $connectionName, $job
  5. ));
  6. }
  7. protected function raiseAfterJobEvent($connectionName, $job)
  8. {
  9. $this->events->fire(new Events\JobProcessed(
  10. $connectionName, $job
  11. ));
  12. }

任务异常处理

Laravel Queue——消息队列任务处理器源码剖析 - 图2

任务在运行过程中会遇到异常情况,这个时候就要判断当前任务的失败次数是不是超过限制。如果没有超过限制,那么就会把当前任务重新放回队列当中;如果超过了限制,那么就要标记当前任务为失败任务,并且将任务从 reserved 队列中删除。

任务失败

markJobAsFailedIfAlreadyExceedsMaxAttempts 函数用于任务运行前,判断当前任务是否重试次数超过限制:

  1. protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
  2. {
  3. $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
  4. if ($maxTries === 0 || $job->attempts() <= $maxTries) {
  5. return;
  6. }
  7. $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
  8. 'A queued job has been attempted too many times. The job may have previously timed out.'
  9. ));
  10. throw $e;
  11. }
  12. public function maxTries()
  13. {
  14. return array_get($this->payload(), 'maxTries');
  15. }
  16. public function attempts()
  17. {
  18. return Arr::get($this->decoded, 'attempts') + 1;
  19. }
  20. protected function failJob($connectionName, $job, $e)
  21. {
  22. return FailingJob::handle($connectionName, $job, $e);
  23. }

当遇到重试次数大于限制的任务,work 进程就会调用 FailingJob:

  1. protected function failJob($connectionName, $job, $e)
  2. {
  3. return FailingJob::handle($connectionName, $job, $e);
  4. }
  5. public static function handle($connectionName, $job, $e = null)
  6. {
  7. $job->markAsFailed();
  8. if ($job->isDeleted()) {
  9. return;
  10. }
  11. try {
  12. $job->delete();
  13. $job->failed($e);
  14. } finally {
  15. static::events()->fire(new JobFailed(
  16. $connectionName, $job, $e ?: new ManuallyFailedException
  17. ));
  18. }
  19. }
  20. public function markAsFailed()
  21. {
  22. $this->failed = true;
  23. }
  24. public function delete()
  25. {
  26. parent::delete();
  27. $this->redis->deleteReserved($this->queue, $this);
  28. }
  29. public function isDeleted()
  30. {
  31. return $this->deleted;
  32. }

FailingJob 会标记当前任务 faileddeleted,并且会将当前任务移除 reserved 队列,不会再重试:

  1. public function deleteReserved($queue, $job)
  2. {
  3. $this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());
  4. }

FailingJob 还会调用 RedisJobfailed 函数,并且触发 JobFailed 事件:

  1. public function failed($e)
  2. {
  3. $this->markAsFailed();
  4. $payload = $this->payload();
  5. list($class, $method) = JobName::parse($payload['job']);
  6. if (method_exists($this->instance = $this->resolve($class), 'failed')) {
  7. $this->instance->failed($payload['data'], $e);
  8. }
  9. }

程序会解析 job 类,我们先前在 redis 中已经存储了:

  1. [
  2. 'job' => 'Illuminate\Queue\CallQueuedHandler@call',
  3. 'maxTries' => isset($job->tries) ? $job->tries : null,
  4. 'timeout' => isset($job->timeout) ? $job->timeout : null,
  5. 'data' => [
  6. 'commandName' => get_class($job),
  7. 'command' => serialize(clone $job),
  8. ],
  9. ];

我们接着看 failed 函数:

  1. public function failed(array $data, $e)
  2. {
  3. $command = unserialize($data['command']);
  4. if (method_exists($command, 'failed')) {
  5. $command->failed($e);
  6. }
  7. }

可以看到,最后程序调用了任务类的 failed 函数。

异常处理

当任务遇到异常的时候,程序仍然会判断当前任务的重试次数,如果本次任务的重试次数已经大于或等于限制,那么就会停止重试,标记为失败;否则就会重新放入队列,记录日志。

  1. protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
  2. {
  3. try {
  4. $this->markJobAsFailedIfWillExceedMaxAttempts(
  5. $connectionName, $job, (int) $options->maxTries, $e
  6. );
  7. $this->raiseExceptionOccurredJobEvent(
  8. $connectionName, $job, $e
  9. );
  10. } finally {
  11. if (! $job->isDeleted()) {
  12. $job->release($options->delay);
  13. }
  14. }
  15. throw $e;
  16. }
  17. protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
  18. {
  19. $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
  20. if ($maxTries > 0 && $job->attempts() >= $maxTries) {
  21. $this->failJob($connectionName, $job, $e);
  22. }
  23. }
  24. public function release($delay = 0)
  25. {
  26. parent::release($delay);
  27. $this->redis->deleteAndRelease($this->queue, $this, $delay);
  28. }
  29. public function deleteAndRelease($queue, $job, $delay)
  30. {
  31. $queue = $this->getQueue($queue);
  32. $this->getConnection()->eval(
  33. LuaScripts::release(), 2, $queue.':delayed', $queue.':reserved',
  34. $job->getReservedJob(), $this->availableAt($delay)
  35. );
  36. }

一旦任务出现异常错误。那么该任务将会立刻从 reserved 队列放入 delayed 队列,并且抛出异常,抛出异常后,程序会将其记录在日志中。

  1. public static function release()
  2. {
  3. return <<<'LUA'
  4. -- Remove the job from the current queue...
  5. redis.call('zrem', KEYS[2], ARGV[1])
  6. -- Add the job onto the "delayed" queue...
  7. redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
  8. return true
  9. LUA;
  10. }

任务的运行

任务的运行首先会调用 CallQueuedHandlercall 函数:

  1. public function fire()
  2. {
  3. $payload = $this->payload();
  4. list($class, $method) = JobName::parse($payload['job']);
  5. with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
  6. }
  7. public function call(Job $job, array $data)
  8. {
  9. $command = $this->setJobInstanceIfNecessary(
  10. $job, unserialize($data['command'])
  11. );
  12. $this->dispatcher->dispatchNow(
  13. $command, $handler = $this->resolveHandler($job, $command)
  14. );
  15. if (! $job->isDeletedOrReleased()) {
  16. $job->delete();
  17. }
  18. }

setJobInstanceIfNecessary 函数用于为任务类的 trait: InteractsWithQueue 的设置任务类:

  1. protected function setJobInstanceIfNecessary(Job $job, $instance)
  2. {
  3. if (in_array(InteractsWithQueue::class, class_uses_recursive(get_class($instance)))) {
  4. $instance->setJob($job);
  5. }
  6. return $instance;
  7. }
  8. trait InteractsWithQueue
  9. {
  10. public function setJob(JobContract $job)
  11. {
  12. $this->job = $job;
  13. return $this;
  14. }
  15. }

接着任务的运行就要交给 dispatch :

  1. public function dispatchNow($command, $handler = null)
  2. {
  3. if ($handler || $handler = $this->getCommandHandler($command)) {
  4. $callback = function ($command) use ($handler) {
  5. return $handler->handle($command);
  6. };
  7. } else {
  8. $callback = function ($command) {
  9. return $this->container->call([$command, 'handle']);
  10. };
  11. }
  12. return $this->pipeline->send($command)->through($this->pipes)->then($callback);
  13. }
  14. public function getCommandHandler($command)
  15. {
  16. if ($this->hasCommandHandler($command)) {
  17. return $this->container->make($this->handlers[get_class($command)]);
  18. }
  19. return false;
  20. }
  21. public function hasCommandHandler($command)
  22. {
  23. return array_key_exists(get_class($command), $this->handlers);
  24. }

如果不对 dispatcher 类进行任何 map 函数设置,getCommandHandler 将会返回 null,此时就会调用任务类的 handle 函数,进行具体的业务逻辑。

任务结束后,就会调用 delete 函数:

  1. public function delete()
  2. {
  3. parent::delete();
  4. $this->redis->deleteReserved($this->queue, $this);
  5. }
  6. public function deleteReserved($queue, $job)
  7. {
  8. $this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());
  9. }

这样,运行成功的任务会从 reserved 中删除。